socket-io and rust
本质上讲socket-io-client就是在websocket上进行了薄薄的封装.最简client代码如下
const WebSocket = require("ws");
const EngineIoParser = require("../../../engine.io-parser");
const SocketIoParser = require("../../../socket.io-parser");
const ws = new WebSocket(
"ws://localhost:3000/socket.io/?EIO=3&transport=websocket"
);
let config = {};
ws.on("close", () => {
console.log("ws close");
});
ws.on("open", function open() {
console.log("ws open");
});
ws.on("message", data => {
const engineIoPacket = EngineIoParser.decodePacket(data);
console.log("on msg", data,engineIoPacket);
if (engineIoPacket.type === "open") {
console.log("on open", engineIoPacket);
config = JSON.parse(engineIoPacket.data);
} else if (engineIoPacket.type === "message") {
console.log("on message");
const socketIoPacket = SocketIoParser.decodeString(engineIoPacket.data);
console.log("socketIoPacket",engineIoPacket, socketIoPacket);
if (socketIoPacket.type === SocketIoParser.CONNECT) {
console.log("socketIoPacket", socketIoPacket,"connect");
onConnect(ws);
ping(ws);
} else if (socketIoPacket.type === SocketIoParser.EVENT) {
onReceEvent(ws, {
name: socketIoPacket.data[0],
data: socketIoPacket.data[1]
});
}
} else if (engineIoPacket.type === "pong") {
console.log("on pong");
ping(ws);
}
});
function buildPingPacket(packet) {
return EngineIoParser.encodeStringPacket({ type: "ping" });
}
function buildCustomEventPacket({ name, data }) {
const packet = {
type: SocketIoParser.EVENT,
data: [name, data],
options: { compress: true },
nsp: "/"
};
const encoded = SocketIoParser.encodeAsString(packet);
return EngineIoParser.encodeStringPacket({ type: "message", data: encoded });
}
function sendEvent(ws, name, data) {
ws.send(
buildCustomEventPacket({
name,
data
}),
{ compress: false }
);
}
function onConnect(ws) {
sendEvent(ws, "event", "lalala test send to server");
}
function onReceEvent(ws, { name, data }) {
console.log("onReceEvent", name, data);
}
function ping(ws) {
setTimeout(() => {
const pingPacket = buildPingPacket();
console.log("ping",pingPacket);
ws.send(pingPacket, { compress: false });
}, 1000 * 3);
}
因此理论上来讲只要有websocket库实现socket-io client应该不是一件难事 然而今天最起码花费了我大概4-5小时时间 成功卡住了 不得不重头来开始调研缺失的知识 主要就是tokio的使用 socket-io的连接过程大概是websocket连接到server,server返回id和连接参数,然后server向client发送open事件,client响应open事件根据连接参数设置定时器去发送ping请求给server,server接收到ping立即发pong回来,所谓的心跳机制实际上就是这个不断重复的ping-pong的过程,但server端的pong没有在给定时间内发送过来时,客户端就会触发超时事件.
rust的websocket库 websocket的异步接口将websocket抽象成feature stream 我的想法大概同样 将socket-io 同样抽象成feature stream,对于stream做map应该不是问题,但如何完成心跳是我现在苦恼的地方.
tokio
我对于tokio future 的理解大概就是 future代表的是通过结构封装起来的函数执行顺序 tokio每次会去poll这个future一次,每次都会导致future内部的变化,使其从NotReady到Ready状态.同时提供了一些异步实现例如文件IO,网络流等.每次pollfuture时实际上没有等待IO的阻塞操作,例如当调用readfile时如果readfile返回的是NotReady那么就直接去poll下一个future了 等到操作系统通知文件读取完毕再去poll一次这个future 这时read_file的返回值就是Ready(Data)这样来达到无阻塞的目的.mio库封装了操作系统文件读取等的回调.
问题1. 谁去pull future again?
fn main() {
let mut runtime = tokio::runtime::current_thread::Builder::new()
.build()
.unwrap();
runtime.block_on(run().unwrap()).unwrap();
}
fn run() -> Result<impl Future<Item=(), Error=failure::Error>, Error> {
let res = SocketIoStream {}
.for_each(|event| {
println!("socket-io event {:?}", event);
Ok(())
}).into_future();
Ok(res)
}
impl Stream for SocketIoStream {
type Item = SocketIoEvent;
type Error = Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
info!("ss poll");
Ok(Async::NotReady)
}
}
会发现只打了一次log,这就是之前提到的future只会被pull一次的导致的,我们必须将这个future与某些事件关联起来,当发生这些事件时,重新来poll一次future才能使这个状态机从初始状态转化为终结状态
timeout
正如之前的介绍 心跳机制事件上是一系列定时触发的回调,也就是一堆状态机只不过触发状态变化的是timer
digraph G {
"open" -> "ping" [label="ping interval"]
"ping" -> "ping_over" [label="socket"]
"ping_over" -> "pong_checking" [label="pong timeout"]
"pong_checking" -> "heartbeat_ok" [label="ws received pong"]
"pong_checking" -> "heartbeat_fail" [label="ws not received pong"]
"heartbeat_ok" ->"ping" [label="ping interval"]
}